Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

pushsync#1392

Closed
nonsense wants to merge 22 commits intomasterfrom
pushsync-anton
Closed

pushsync#1392
nonsense wants to merge 22 commits intomasterfrom
pushsync-anton

Conversation

@nonsense
Copy link
Copy Markdown
Contributor

@nonsense nonsense commented May 13, 2019

This PR implements push syncing protocol in a new package swarm/pushsync

The entry point to push sync is the Pusher struct .

It uses two interfaces:

  • PushSyncIndex
    • to iterate over chunks to be push synced
    • to mark items synced
    • the new localstore SubscribePush and Set methods implements this.
  • PubSub
    • to communicate
    • pss.PubSub implements this interface
    • Send and Recei

storer nodes which accept push synced content will run Storer on swarm.go level

TODOs:

  • integration test with localstore and pss pubsub
  • simulation test with many nodes

Push tags spec:
https://hackmd.io/9eWxJ_MJS8i04onWg49UBA?both


supersedes #1323

I integrated and built on Anton's changes.
I kept most of the instrumentation/tracing introduced.
Importantly, I kept the change in logic about receipts being sent back only by the nodes thinking they are the closest node.
implemented a shortcut (receipts sent to self) if the chunk is closest to self.
Improved resilience of tests, comments and eliminated flakiness.

To run the simulation with a larger number of nodes/chunks/concurrent upload events, flangs can be used (Note: max allowed open files need to be set high, use ulimit -n on linux/macos):

 go test -v -cpu 8 ./storage/pushsync/ -run TestPushSyncSimulation -loglevel=0 -count=30 -nodes=128 -chunks=64 cases=64

@nonsense nonsense force-pushed the pushsync-anton branch 2 times, most recently from bb20d54 to 339b8aa Compare May 15, 2019 10:56
Comment thread swarm/network/stream/syncer.go Outdated
Comment thread swarm/storage/pushsync/pusher.go Outdated
Comment thread swarm/storage/pushsync/pusher.go Outdated
Copy link
Copy Markdown
Contributor

@acud acud left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some friction with several choices that were made here.

  1. The pushsync package should not be in the storage package, but rather somewhere closer to the pull sync code. I don't see a reason why this should live in the storage package and the other in the network package. both should be at the same place.
  2. the tests coverage could be improved, just as the joint efforts that were made with @nonsense to create more exhaustive test cases for pull-sync - the same should be done here. I'd really like to see inclusion and exclusion checks on chunk whereabouts in the network. The simulation tests just pop up a topology and does the usual upload-download tests, which we have previously concluded that kind of don't test anything in particular. Without the merge of fetcher simplification (where a retrieval request's route heuristic is much more accurate) is difficult to conclude as stable.

Comment thread cmd/swarm/swarm-smoke/upload_and_sync.go Outdated
Comment thread cmd/swarm/swarm-smoke/upload_and_sync.go Outdated
Comment thread swarm/api/http/server.go Outdated
Comment thread swarm/api/http/server.go Outdated
Comment thread swarm/chunk/chunk.go Outdated
Comment thread swarm/storage/pushsync/pusher_test.go Outdated
Comment thread swarm/storage/pushsync/pusher_test.go Outdated
Comment thread swarm/storage/pushsync/pusher_test.go Outdated
Comment thread swarm/storage/pushsync/pusher_test.go Outdated
Comment thread swarm/storage/pushsync/pusher_test.go Outdated
Copy link
Copy Markdown
Contributor

@nolash nolash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again I find PRs like these to be too long. To improve quality of code review, sub-components should be incrementally introduced. I'm not sure why we keep doing this.

// Storer run storer nodes to handle the reception of push-synced chunks
// that fall within their area of responsibility.
// The protocol makes sure that
// - the chunks are stored and synced to their nearest neighbours and
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// The protocol makes sure that
// - the chunks are stored and synced to their nearest neighbours

This is provided that indeed I am wrong about the only-one-node guarantee of pss prox sending, I presume.

Comment thread swarm/storage/pushsync/storer.go Outdated
}

// NewStorer constructs a Storer
// Storer run storer nodes to handle the reception of push-synced chunks
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storer nodes

Why is this plural? One node, one storer, right?

Comment thread swarm/storage/pushsync/storer.go
Comment thread swarm/storage/pushsync/storer.go
Comment thread swarm/storage/pushsync/storer.go Outdated
Comment thread swarm/storage/pushsync/pusher_test.go Outdated
}

// TestPushSyncAndStoreWithLoopbackPubSub tests the push sync protocol
// push syncer node communicate with storers via mock PubSub
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would appreciate comments that describe the test procedure a bit more in detail.

Comment thread swarm/storage/pushsync/pushsync_simulation_test.go Outdated
// the created tag indicates the uploader and downloader nodes
tagname := fmt.Sprintf("tag-%v-%v-%d", label(uid[:]), label(did[:]), i)
log.Debug("uploading", "peer", uid, "chunks", chunkCnt, "tagname", tagname)
tag, what, err := upload(ctx, p.store.(*localstore.DB), p.tags, tagname, chunkCnt)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread swarm/storage/pushsync/pushsync_simulation_test.go Outdated
Comment thread swarm/storage/pushsync/pushsync_simulation_test.go Outdated
@nonsense nonsense force-pushed the pushsync-anton branch 5 times, most recently from a86f8ed to c3275ca Compare June 11, 2019 09:13
@acud
Copy link
Copy Markdown
Contributor

acud commented Jun 12, 2019

@nonsense the force push after the review marked everything we commented on as outdated but looking at the code shows that not much has changed.

@nonsense
Copy link
Copy Markdown
Contributor Author

@acud sorry about that, I won't force push this PR again, unless it diverges a lot from master.

I think using the Resolve / Unresolve button will be helpful with PRs with so many comments, as it hides threads that have been covered.

@acud acud mentioned this pull request Jun 18, 2019
3 tasks
@nonsense
Copy link
Copy Markdown
Contributor Author

@acud sorry, had to rebase on master.

bucket.Store(bucketKeyNetStore, netStore)

noSyncing := &stream.RegistryOptions{Syncing: stream.SyncingDisabled}
noSyncing := &stream.RegistryOptions{Syncing: stream.SyncingDisabled, SyncUpdateDelay: 50 * time.Millisecond}
Copy link
Copy Markdown
Contributor Author

@nonsense nonsense Jun 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is rather confusing to me. We have syncing disabled, but we also need to modify the SyncUpdateDelay to 50ms (I know by default it is 10 or 15sec.). Why is that @acud ?

@nonsense nonsense force-pushed the pushsync-anton branch 10 times, most recently from b80a859 to d7ab14f Compare July 9, 2019 15:11
zelig and others added 18 commits September 13, 2019 10:10
- storer needs to take netstore not localstore to put the chunk
  so that fetchers created earlier could respond
storage/pushsync: update NetStore api from master

storage/pushsync: add sleeps and a bit more tracing, change subscription wait delay

network: try sending receipt only if there is no closer peer

storage/pushsync: use netstore, rather than localstore

storage/netstore: logs with node id

storage/pushsync: opentracing

storage/pushsync: propagate origin on receipts

is push synced - smoke test

network: kademlia closer peer

storage: very high retry timeout

measure send chunk

rlp timer

pss timers and goroutine sendChunk

traces for chunsk in localstore and subscribepush

try minbinsize: 3

increase search timeout ; move tracing before nil check

link netstore and delivery

rename tag New to tag Create

swarm-smoke: fix check max prox hosts for pull/push sync modes

tag roundtrip wip

more logs

kad as part of pusher?

kad to storer; revert pss change

metrics to pss

outbox len metric

emit metrics once every 10sec.

pss: refactor

storage/pushsync: closer than me trace

chunk: adding XOR comparison

disable pushsync tests, as they are not setting up kad properly
    - WaitTillDone does check initially
    - introduce IncN to increment with n
    - api: inspector to use tag.Done
    - unexport context, span, and Context() to be used by http server
    - minor improvements in logging
    - calls on Kademlia directly on Pss struct
    - add IsClosestTo function to pubsub using kademlia.IsClosestTo
    - move package from storage to toplevel
    - only closest peer to address returns a receipt
    - IsClosestTo(addr) is now part of the PubSub interface
    - rename TestPushSyncAndStoreWithLoopback to TestProtocol
    - for TestPusher and , IsClosest is mocked properly
    - remove Origin field from receipt message
    - pushed item remembers first and last sent time
    - retryInterval is dynamically set as 2 * average roundtrip (excluding outliers)
    - early send check is removed from unit test
    - receipts channel is just []byte for address
    - we are using mutli-set for setting synced status on chunks
    - tag increment now follows multiset and not incorrectly when the receipt arrives
      for this remember syncedItems is needed alongside syncedAddrs
    - in pusher sync loop, we use static context
    - correctly unsubscribe DB.SubscribePush
    - if pusher is closest node to a chunk, chunk is not sent using pubsub, but receipt is
      directly sent to self using a shortcut
    - therefore no self send is needed in storer
    - loopBack pubsub is shared in the protocol unit test but wrapped in different testPubSub structs
      to control the behaviour of IsClosestTo function
    - added delayResponse to TestProtocol too
    - testPushSyncIndex SubscribePush now increments tag StateStored correctly only first time
    - fix checkTags to wait for synced status (it caused occasional flakiness)
    - extract simulation test parameters (nodes/chunks/testcases) as command line flags, default is 16/16/16
  - introduce custom logger on pusher/storer
  - dynamic setting of retryinterval extracted
  - timeout changed back
  - fix and use testutil.checkTags
  - use tag context
  - tags.Create and NewTag has context argument
  - fix pusher.Close before localstore
 - changed order of select cases
 - have Tests on top followed by helpers
 - add sent,synced check to testutil.CheckTag
 - add timeout to close wait
 - adapt to testutil.Init()
 - length protection to label helper
 - resolve rebase conflicts
// * wait until the uploaded chunks are synced
// * downloader downloads the chunk
// Testcases are run concurrently
func TestPushsyncSimulation(t *testing.T) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is failing for two reasons:

  • snapshot loading failing as no peers and connections are in kademlia (WaitTillSnapshotRecreated simulations method is requiring kademlia connections) which means that we must always have bzz protocol for simulations services
  • netstore missing RemoteGet function

These changes are solving these problems:

diff --git a/pushsync/simulation_test.go b/pushsync/simulation_test.go
index bf50748cd..5b2bffe75 100644
--- a/pushsync/simulation_test.go
+++ b/pushsync/simulation_test.go
@@ -200,6 +200,7 @@ func newServiceFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Servic
        bucket.Store(bucketKeyNetStore, netStore)
 
        r := retrieval.New(kad, netStore, kad.BaseAddr())
+       netStore.RemoteGet = r.RequestFromPeers
 
        pubSub := pss.NewPubSub(ps)
        // setup pusher
@@ -216,13 +217,20 @@ func newServiceFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Servic
                os.RemoveAll(dir)
        }
 
-       return &RetrievalAndPss{r, ps}, cleanup, nil
+       bzz := network.NewBzz(&network.BzzConfig{
+               OverlayAddr:  addr.Over(),
+               UnderlayAddr: addr.Under(),
+               HiveParams:   network.NewHiveParams(),
+       }, kad, nil, nil, nil, nil, nil)
+
+       return &RetrievalAndPss{r, ps, bzz}, cleanup, nil
 }
 
 // implements the node.Service interface
 type RetrievalAndPss struct {
        retrieval *retrieval.Retrieval
        pss       *pss.Pss
+       bzz       *network.Bzz
 }
 
 func (s *RetrievalAndPss) APIs() []rpc.API {
@@ -230,11 +238,15 @@ func (s *RetrievalAndPss) APIs() []rpc.API {
 }
 
 func (s *RetrievalAndPss) Protocols() []p2p.Protocol {
-       return append(s.retrieval.Protocols(), s.pss.Protocols()...)
+       return append(append(s.retrieval.Protocols(), s.pss.Protocols()...), s.bzz.Protocols()...)
 }
 
 func (s *RetrievalAndPss) Start(srv *p2p.Server) error {
-       err := s.retrieval.Start(srv)
+       err := s.bzz.Start(srv)
+       if err != nil {
+               return err
+       }
+       err = s.retrieval.Start(srv)
        if err != nil {
                return err
        }
@@ -242,7 +254,11 @@ func (s *RetrievalAndPss) Start(srv *p2p.Server) error {
 }
 
 func (s *RetrievalAndPss) Stop() error {
-       err := s.retrieval.Stop()
+       err := s.bzz.Stop()
+       if err != nil {
+               return err
+       }
+       err = s.retrieval.Stop()
        if err != nil {
                return err
        }

Comment thread api/pullsync/client.go
@@ -0,0 +1,14 @@
package pullsync
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that this file is committed by mistake.

Comment thread api/pullsync/server.go
@@ -0,0 +1,9 @@
package pullsync
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this file also is probably committed by mistake.

@nonsense
Copy link
Copy Markdown
Contributor Author

nonsense commented Sep 17, 2019

I tested 1eb077ec5 and it is failing all the smoke tests - it doesn't seem to be possible to sync a 100mb file with push sync only (while pull sync is disabled). Therefore we definitely have some regressions with this code. I will see what happens when we increase the retry timeout, but in my opinion we should start from the last known good commit and iterate slowly with all the additional changes.

EDIT: We agreed with @janos to work on the HEAD of this branch, as it has already been reviewed and we don't want to do double-work.

@nonsense
Copy link
Copy Markdown
Contributor Author

@zelig as an owner of this feature could you please open a PR, so that I can submit reviews? I think we should not merge this until:

  1. we have a feature flag where push sync is disabled by default until we have very high confidence that it works.
  2. we have benchmarked push sync and it works according to our previous tests with comparable speeds - for example commit ae01b11a8.

Comment thread swarm.go
}

pubsub := pss.NewPubSub(self.ps)
self.pushSync = pushsync.NewPusher(localStore, pubsub, self.tags)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since push sync is constructed here, it should be closed in the Swarm.Stop before netstore close.

@janos janos mentioned this pull request Sep 19, 2019
@nonsense
Copy link
Copy Markdown
Contributor Author

Closing in favour of #1782

@nonsense nonsense closed this Sep 23, 2019
@nonsense nonsense deleted the pushsync-anton branch September 30, 2019 10:45
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants